Plongeur

A topological data analysis library.

Core algorithm written in Scala, using Apache Spark.

Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.

Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.

Reactive machinery powered by Rx RxScala.

Notebook focus

Cleaning up Rx subscriptions on re-evaluation of cells. This is too brittle for my taste.

Maven dependencies


In [5]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps com.github.haifengl smile-core 1.1.0 --transitive --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps com.chuusai shapeless_2.10 2.3.0 --repository https://oss.sonatype.org/content/repositories/releases/ --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.21 --repository file:/Users/tmo/.m2/repository


Marking org.apache.spark:spark-mllib_2.10:1.6.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/https/repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.2/spark-mllib_2.10-1.6.2.jar
Marking com.github.haifengl:smile-core:1.1.0 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/https/repo1.maven.org/maven2/com/github/haifengl/smile-graph/1.1.0/smile-graph-1.1.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/https/repo1.maven.org/maven2/com/github/haifengl/smile-data/1.1.0/smile-data-1.1.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/https/repo1.maven.org/maven2/com/github/haifengl/smile-math/1.1.0/smile-math-1.1.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/https/repo1.maven.org/maven2/com/github/haifengl/smile-core/1.1.0/smile-core-1.1.0.jar
Marking io.reactivex:rxscala_2.10:0.26.1 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxscala_2.10/0.26.1/rxscala_2.10-0.26.1.jar
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxjava/1.1.1/rxjava-1.1.1.jar
Marking com.softwaremill.quicklens:quicklens_2.10:1.4.4 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/https/repo1.maven.org/maven2/com/softwaremill/quicklens/quicklens_2.10/1.4.4/quicklens_2.10-1.4.4.jar
Marking com.chuusai:shapeless_2.10:2.3.0 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/
-> https://oss.sonatype.org/content/repositories/releases/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/https/oss.sonatype.org/content/repositories/releases/com/chuusai/shapeless_2.10/2.3.0/shapeless_2.10-2.3.0.jar
Marking org.tmoerman:plongeur-spark_2.10:0.3.21 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7472380021669285779/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/org/tmoerman/plongeur-spark_2.10/0.3.21/plongeur-spark_2.10-0.3.21.jar

In [6]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar


Using cached version of declarativewidgets.jar

Import classes


In [7]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._

import shapeless.HNil

import org.tmoerman.plongeur.tda._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._

import declarativewidgets._
initWidgets

import declarativewidgets.WidgetChannels.channel



In [8]:
import java.util.concurrent.atomic.AtomicReference

case class SubRef(val ref: AtomicReference[Option[Subscription]] = new AtomicReference[Option[Subscription]](None)) extends Serializable {

    def update(sub: Subscription): Unit = ref.getAndSet(Option(sub)).foreach(old => old.unsubscribe())

    def reset(): Unit = update(null)

}

Import polymer elements

These cells triggers Bower installations of the specified web components.

If it doesn't work, check whether Bower has sufficient permissions to install in the jupyter /nbextensions folder.


In [9]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html' 
        is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html' 
        is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html' 
        is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>


Out[9]:

Reactive TDA Machine

Keep references to Rx subscriptions apart.


In [10]:
val in$_subRef = SubRef()

Instantiate a PublishSubject. This stream of TDAParams instances represents the input of a TDAMachine. The PublishSubject listens to changes and sets these to the channel "ch_TDA_1" under the "params" key.

TODO: unsubscribe previous on re-evaluation


In [11]:
val in$ = PublishSubject[TDAParams]

in$_subRef.update(in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString)))

Create an initial TDAParams instance. In the same cell, we submit the instance to the PublishSubject.

For the sake of illustration, we create an html snippet that listens to changes on the "ch_TDA_1" channel and displays the value of the "params" key.


In [12]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>  
    <div style='background: #FFB; padding: 10px;'>
        <span style='font-family: "Courier"'>[[params]]</span>
    </div>
</template>


Out[12]:

Notice that when we evaluate the TDAParams instantiation cells, the output of the yellow box changes.

Inititalize rdd

In this example, we are using a synthetic torus-shaped 2D data set.


In [13]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense

def readCircle(file: String) = 
    sc.
        textFile(file).
        map(_.split(",").map(trim)).
        zipWithIndex.
        map{ case (Array(x, y), idx) => dp(idx, dense(x.toDouble, y.toDouble))}

In [14]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"

val circle_1k_path = data_path + "circle.1k.csv"

val rdd = readCircle(circle_1k_path).cache

val ctx = TDAContext(sc, rdd)

Turn a TDAResult into a data structure.


In [15]:
val r = scala.util.Random

def format(result: TDAResult) = Map(
    "nodes" -> result.clusters.map(c =>
      Map(
        "id"     -> c.id.toString,
        "label"  -> c.id.toString,
        "size"   -> c.dataPoints.size,
        "x"      -> r.nextInt(100),
        "y"      -> r.nextInt(100))),
    "edges" -> result.edges.map(e => {
      val (from, to) = e.toArray match {case Array(f, t) => (f, t)}

      Map(
        "id"     -> s"$from--$to",
        "source" -> from.toString,
        "target" -> to.toString)}))

Run the machine, obtaining an Observable of TDAResult instances


In [16]:
val out$: Observable[(TDAParams, TDAResult)] = TDAMachine.run(ctx, in$)

In [17]:
val out$_subRef = SubRef()

In [18]:
out$_subRef.update(
    out$.subscribe(
        onNext = (t) => t match {case (p, r) => channel("ch_TDA_1").set("result", format(r))},
        onError = (e) => println("Error in TDA machine: ", e)))

Reactive inputs

First, we set up a stream of updates to BASE TDAParams instance.


In [19]:
val pipe$_subRef = SubRef()

val nrBins$ = PublishSubject[Int]

val overlap$ = PublishSubject[Percentage]

In [20]:
channel("ch_TDA_1").watch("nrBins",  (_: Any, v: Int) => nrBins$.onNext(v))
channel("ch_TDA_1").watch("overlap", (_: Any, v: Int) => overlap$.onNext(BigDecimal(v) / 100))

In [21]:
import TDAParams._

val BASE = 
    TDAParams(
        lens = TDALens(          
          Filter("PCA" :: 0 :: HNil, 10, 0.5)),
        clusteringParams = ClusteringParams(),
        scaleSelection = histogram(10))

val params$ =
    List(
        nrBins$.map(v => setFilterNrBins(0, v)),
        overlap$.map(v => setFilterOverlap(0, v))).
    reduce(_ merge _).
    scan(BASE)((params, fn) => fn(params))

pipe$_subRef.update(params$.subscribe(in$))

channel("ch_TDA_1").set("nrBins", BASE.lens.filters(0).nrBins)
channel("ch_TDA_1").set("overlap", (BASE.lens.filters(0).overlap * 100).toInt)

We create two slider widgets that provide the inputs for the nrBins$ and overlap$ Observables.


In [22]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>  
    <table style="border-style: hidden;">
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">nr of bins</th>
            <td style="border-style: hidden;">
                <paper-slider min="0" max="100" step="1" value="{{nrBins}}"></paper-slider>
            </td>
            <td style="border-style: hidden;">[[nrBins]]</td>
        </tr>
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">overlap</th>
            <td style="border-style: hidden;">
                <paper-slider min="0" max="75" step="1" value="{{overlap}}"></paper-slider>
            </td>
            <td style="border-style: hidden;">[[overlap]]%</td>
        </tr>
    </table>        
</template>


Out[22]:

In [23]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>    
    <plongeur-graph data="{{result}}"></plongeur-graph>
</template>


Out[23]:

In [24]:
val tdaParams =
      TDAParams(
        lens = TDALens(
          Filter("PCA" :: 0 :: HNil, 10, 0.25),
          Filter("PCA" :: 1 :: HNil, 10, 0.25)),
        clusteringParams = ClusteringParams(),
        scaleSelection = histogram(10))

in$.onNext(tdaParams)

In [25]:
ctx.dim


Out[25]:
2

In [35]:
import org.apache.spark.mllib.feature.{PCA, PCAModel}

val points = ctx.dataPoints

val dims = ctx.dim
    
val pcaModel = new PCA(dims).fit(points.map(_.features))
    
val pcaCoords = points.collect.map(dp => pcaModel.transform(dp.features).toArray.toList).toList

In [36]:
channel("data").set("pca", pcaCoords)

In [37]:
%%html
<template is='urth-core-bind' channel='data'>    
    <urth-viz-scatter
        datarows='[[pca]]'
        primary='0'
        secondary='1'        
        />
</template>


Out[37]:

In [ ]:


In [ ]: